home *** CD-ROM | disk | FTP | other *** search
/ Libris Britannia 4 / science library(b).zip / science library(b) / DDJMAG / DDJ9212.ZIP / hyprcube.asc < prev    next >
Text File  |  1992-11-30  |  66KB  |  1,838 lines

  1. _SIMULATING HYPERCUBES IN UNIX_
  2. by Jeffery W. Hamilton and Eileen M. Ormsby
  3.  
  4. [LISTING ONE]
  5.  
  6. /***** cube.h *****/
  7.  
  8. /* Hypercube Simulation definitions */
  9. #define NUMBER_IN_PART 4            /* number of nodes in partition */
  10. #define PM_PORT        6000
  11.  
  12. /* Maximum message sent between nodes */
  13. #define MAX_MESSAGE_SIZE (1024 * 16)
  14.  
  15. typedef struct {
  16.   char *name;             /* network name of the computer hosting partition */
  17.   int  socket;            /* file descriptor for the socket */
  18.   int  errfdp;            /* file descriptor for sending "kill" values */
  19.   struct sockaddr_in addr;
  20. } subpart;
  21.  
  22. typedef struct {
  23.   int type;      /* message type sent with the message -1 or greater */
  24.   int spid;      /* sender's group number (pid) */
  25.  
  26.   int snode;     /* sender's node number */
  27.   int dnode;     /* node this message is destined for */
  28.   int t_length;  /* total length of message */
  29.   int length;    /* length of the message */
  30.   char valid[NUMBER_IN_PART+2];     /* 0= no message */
  31.   char msg[MAX_MESSAGE_SIZE];     /* Actual message contents */
  32.  
  33.  
  34. [LISTING TWO]
  35.  
  36. /***** pm.c *****/
  37. /* PARTITION MANAGER -- This program will run on all partitions used for an
  38. ** application. It is started via a remote execution call from "load".
  39. ** The main program gets the input arguments, sets a few variables and calls
  40. ** the Partition Manager subroutine which performs the following functions:
  41. **  determines local partition information; allocates neccessary partition 
  42. **  structures; sets up interrupt handling to free system resources when the
  43. **  application is terminated; sets up server portion of socket communications;
  44. **  forks a client PM that sets up client portion of the socket communications,
  45. **  waits for a node to request data to be sent to a partition, and sends data
  46. **  over the sockets; forks and execs application children; performs server PM
  47. **  functions that waits to receive data from the sockets and notifies 
  48. **  appropriate nodes when data has arrived. 
  49. ** The PM server only receives data for its nodes, and the PM client sends 
  50. ** data to a remote partition.
  51. **    BASIC SOFTWARE ARCHITECTURE: The load module in the simcube library will:
  52. **    1) Read the .pmrc file; 2) Determine how many partitions will be used 
  53. **    for this application; 3) Fork and exec a local PM and the appropriate 
  54. **    number of remote PMs PM is passed the name of the application program,
  55. **    its partition number, the key value for PM to communicate to the host 
  56. **    process with the group number, the total number of application nodes,
  57. **    the names of the other partitions running this application.
  58. **   The initialization portion (init_simulator) which is called by the
  59. **   application processes (nodes) will set up interrupt handling and create
  60. **   the shared memory and semaphores necessary for communications
  61. **   between local nodes and the Partition Manager. 
  62. */
  63.  
  64. /* These functions allow a UNIX system to simulate a hypercube environment. */
  65. #include <sys/types.h>
  66. #include <errno.h>
  67. #include <stdio.h>
  68. #include <fcntl.h>
  69. #include <sys/ioctl.h>
  70. #include <sys/sem.h>
  71. #include <sys/shm.h>
  72. #include <sys/ipc.h>
  73. #include <sys/wait.h>
  74. #include <signal.h>
  75. #include <sys/socket.h>
  76. #include <netinet/in.h>
  77. #include <netdb.h>
  78. #include <string.h>
  79. #include <sys/time.h>
  80. #include <sys/utsname.h>
  81. #include <sys/param.h>
  82. #include "cube.h"
  83.  
  84.  
  85. #define  NUM_TRIES 60
  86. #define  min(x,y) (((x) < (y)) ? (x) : (y))
  87.  
  88. /* Function Prototypes */
  89. void *malloc(int size);
  90. void *shmat(int, void*, int);
  91.  
  92. int pm (char *filename, char *pmsites[]);
  93. void setup_server_sockets(void);
  94. void pm_server(void);
  95. void setup_client_sockets(void);
  96. void pm_getmsg(void);
  97. void abort_prog(void);
  98. void sig_terminate(int sig, int code, struct sigcontext *scp);
  99. void unexpected_death(int sig, int code, struct sigcontext *scp);
  100. int _killcube(int node, int pid);
  101. int init_shared_mem(void **pointer, int size, int key);
  102. int init_semaphore(int *semid, int size, int value, int key);
  103. int semcall_all(int semid, int size, int operation);
  104. int semcall_one(int semid, int num, int operation);
  105. int numnodes(void);
  106. int numparts(void);
  107. int mypart(void);
  108. int partof(int node);
  109. int pm_partof(int node);
  110. int numbuffers(void);
  111. int mybuffer(void);
  112. int bufferof(int node);
  113. int mynode(void);
  114. int myhost(void);
  115. void pm_client(void);
  116.  
  117. /* Local, Private Information */
  118. fd_set node_part_set, temp_set;
  119.              /* node_part_set is the variable that FD_XXX commands are */
  120.              /* applied to. Definitions of fd_set structure, and FD_ZERO, */
  121.              /* FD_SET, FD_CLR, and FD_ISSET macros are in <sys/types.h> */
  122.              /* node_part_set will have socket file descriptors.*/
  123. static int num_parts;         /* number of partitions */
  124. static int my_part;           /* partition this process is in */
  125. static int nodes_in_part;     /* number of nodes in this partition */
  126. static subpart *partition;    /* list of partition information */
  127. static int base;              /* base key value for allocating shared data */
  128. static int my_node;           /* node number for this process */
  129. static int my_group;          /* group id for this process */
  130.  
  131.                               /* There are two groups, host communications */
  132.                               /* and inter-node communications */
  133. static int num_nodes;         /* total number of nodes in all partitions */
  134.  
  135. static int msgavail = -1;     /* semaphores indicating message is available */
  136. static int msgfree = -1;      /* semaphores indicating buffer is free */
  137. static int next_message = -1; /* which message is to be received next */
  138. static int shmid_m = -1;      /* id of shared area for messages */
  139. static message *buffer = NULL;/* communication areas */
  140. static int *children = NULL;  /* process ids of all child processes */
  141. static int child_index = 0;   /* number of children created */
  142. static int pmserver_pid = 0;  /* pid of pmserver */
  143.  
  144. /* Main:  reads arguments from command line, places them in local variables
  145. ** and calls pm. (Local variables are not necessary, but enhances readability)
  146. ** NOTE:  ONLY TEN NODES (PM SITES) ARE READ FROM THE COMMAND LINE */
  147. int main(int argc, char *argv[])
  148. {
  149.    char *filename;
  150.    char *pmsites[16];
  151.    int  i;
  152.    if (argc < 7 ) {
  153.       fprintf (stderr, "PM main: error not enough arguments\n");
  154.       fflush(stderr);
  155.       exit(-1);
  156.    }
  157.    filename = argv[1];
  158.    my_part = atoi(argv[2]);
  159.    base = atoi(argv[3]);
  160.    my_group = atoi(argv[4]);
  161.    num_nodes = atoi(argv[5]);
  162.    for (i = 0; i < argc - 6; i++) {
  163.      pmsites[i] = argv[i + 6];
  164.    }
  165.    pm (filename, pmsites);
  166. }
  167.  
  168. /* pm -- Determines partition information, sets up signal handling, sets up
  169. ** server sockets, forks client pm, forks application children. PM splits the 
  170. ** application into NUMBER_IN_PART processes. The partition number is passed 
  171. ** as an input parameter. The starting node number is the partition number 
  172. ** NUMBER_IN_PART and remaining processes will be numbered consecutively.
  173. ** Shared memory will be allocated to serve as a communications vehicle within 
  174. ** a partition. Sockets used between partitions to allow multiple UNIX systems
  175. ** to be combined to create a larger set of CPUs to be applied to a problem. */
  176. int pm (char *filename, char *pmsites[])
  177. {
  178.    register int i, pid;
  179.    char temp[128];              /* used to set up environment variables */
  180.    char part_names[64];
  181.    int  start_node;
  182.    int  dest_node;
  183.    /* Determine how many other partitions exist */
  184.    num_parts = (num_nodes + NUMBER_IN_PART - 1) / NUMBER_IN_PART;
  185.    /* Determine which node is the first for this partition */
  186.    start_node = mypart() * NUMBER_IN_PART;
  187.    /* Determine how many nodes are in this partition (1-4) */
  188.    nodes_in_part = numnodes() - (mypart() * NUMBER_IN_PART);
  189.    nodes_in_part = min(NUMBER_IN_PART, nodes_in_part);
  190.    /* Set PM's node to be the last node on this partition */
  191.    /* (The children will be start_node through start_node + nodes_in_part-1) */
  192.    my_node = nodes_in_part;
  193.    /* Create the structure to hold the partition names and socket fds */
  194.    if ((partition = malloc(num_parts * sizeof(subpart))) == NULL) {
  195.       fprintf(stderr,"PM %d SERVER: insufficient memory\n, mypart()");
  196.       fflush(stderr);
  197.       return -1;
  198.    }
  199.    memset(partition, 0, num_parts * sizeof(subpart));
  200.    /* Catch these signals so PM can notify children to clean up */
  201.    signal(SIGINT,sig_terminate);
  202.    signal(SIGTERM,sig_terminate);
  203.    signal(SIGQUIT,sig_terminate);
  204.    /* Watch for unexpected deaths */
  205.    signal(SIGCHLD, unexpected_death);
  206.    /* Create, bind, and listen on sockets */
  207.    setup_server_sockets();
  208.    if (mypart() != 0) {
  209.       /* Only change the base on partitions that are not the one that includes
  210.       ** host. That partition requires same base that host session is using. */
  211.  
  212.       base = getpid();
  213.    }
  214.    /* Allocate shared memory */
  215.    shmid_m = init_shared_mem(&buffer, sizeof(message) * numbuffers(), base);
  216.    if (mypart() != 0) {
  217.       memset(buffer, 0, sizeof(message) * numbuffers());
  218.    }
  219.    /* Allocate communications semaphores */
  220.    init_semaphore(&msgavail, numbuffers(), 0, base+10000);
  221.    init_semaphore(&msgfree, numbuffers(), 0, base+20000);
  222.    /* Flush stdout and stderr before doing a fork, so child doesn't inherit */
  223.    fflush(stdout);
  224.    fflush(stderr);
  225.    /* Fork PM CLIENT here */
  226.    if ((pmserver_pid = fork()) < 0) {
  227.       /* Can't create the PM CLIENT */
  228.       _killcube(0, 0);
  229.       fprintf(stderr, "PM %d SERVER: unable to create PM CLIENT process %d\n",
  230.               mypart(), i);
  231.       fflush(stderr);
  232.       return -1;
  233.    } else if (pmserver_pid == 0) {
  234.       /* Fill in the names of the other sites in the partition structure and
  235.       ** close the socket file desciptors that this process just inherited. */
  236.       for (i = 0; i < num_parts; i++) {
  237.          if (mypart() != i) {
  238.             partition[i].name = pmsites[i];
  239.             close(partition[i].socket);
  240.          }
  241.       }
  242.       /* CALL CLIENT SUBROUTINES */
  243.       setup_client_sockets();
  244.       pm_client();
  245.    } else {
  246.       /* SERVER: forks application children then calls pm_server subroutine */
  247.       /* Read from pmsites array, create a comma delimited string for env */
  248.       part_names[0] = '\0';
  249.       for (i = 0; i < num_parts; i ++) {
  250.          strcat(part_names, pmsites[i]);
  251.          strcat(part_names, ",");
  252.       }
  253.       /* Allocate space for child pids */
  254.       if ((children = malloc(nodes_in_part * sizeof(int))) == NULL) {
  255.          fprintf(stderr,"PM %d SERVER: insufficient memory\n", mypart());
  256.          fflush(stderr);
  257.          return -1;
  258.       }
  259.       /* Load all nodes within this partition */
  260.       for (i = start_node; (i < start_node + nodes_in_part); i++) {
  261.          if ((pid = fork()) < 0) {
  262.             /* Can't create all the children! */
  263.             _killcube(0, 0);
  264.             fprintf(stderr, "PM %d SERVER: unable to create node process %d\n",
  265.                     mypart(), i);
  266.             fflush(stderr);
  267.             return -1;
  268.          } else if (pid == 0) {
  269.             /* I'm the child process */
  270.             /* Start the node program */
  271.             my_node = i;
  272.             sprintf(temp, "SIM_INFO=%d,%d,%d,%d,%s",
  273.                     base,my_node,my_group,num_nodes,part_names);
  274.             if (putenv(temp) != 0) {
  275.                fprintf(stderr,
  276.                        "PM %d SERVER: Insufficient room to add env variable\n",
  277.                        my_node);
  278.                fflush(stderr);
  279.                return -1;
  280.             }
  281.             execlp(filename,filename,NULL);
  282.             /* If we get here, we had a problem */
  283.             perror("execlp");
  284.             fprintf(stderr,"PM %d SERVER: error execing node=%d file=%s 
  285.                     errno=%d\n", mypart(), my_node, filename, errno);
  286.             fflush(stderr);
  287.             return -1;
  288.          } else {
  289.             /* I'm the parent process */
  290.             children[child_index++] = pid;
  291.          }
  292.       }
  293.       /* CALL SERVER SUBROUTINE */
  294.       pm_server();
  295.    } /* end if PM SERVER */
  296. }
  297. /* setup_server_sockets -- SERVER SOCKETS- for all partitions except ourself:
  298. **  Create a socket Bind the socket to a unique PORT id. (If the socket was 
  299. ** in use in a prior iteration, it may not have been reset yet - therefore we 
  300. ** loop a fixed number of times retrying.) Put a listen on socket. Put new 
  301. ** socket file descriptor into our set */
  302. static void setup_server_sockets(void)
  303.  
  304. {
  305.    int i, j;
  306.    struct sockaddr_in part_sock, tempaddr;
  307.    /* Zero out the set of partition sockets */
  308.    FD_ZERO(&node_part_set);
  309.    FD_ZERO(&temp_set);
  310.    for (i = 0; i < num_parts; i++) {
  311.       /* Skip ourself */
  312.       if (i == mypart () )
  313.         continue;
  314.       for (j = 0; j < NUM_TRIES; j++) {
  315.          /* Create a SERVER socket to receive data */
  316.          if ((partition[i].socket = socket(AF_INET, SOCK_STREAM, 0)) 
  317.                     < 0) {
  318.             fprintf(stderr, "PM %d SERVER: can't open stream socket, errno\n",
  319.                     mypart(), errno);
  320.             fflush(stderr);
  321.             exit (100);
  322.          }
  323.          /* Bind SERVER socket to local addr so partitions can send to it */
  324.          bzero((char*)&part_sock, sizeof(part_sock));
  325.          part_sock.sin_family = AF_INET;
  326.          part_sock.sin_addr.s_addr = htonl (INADDR_ANY);
  327.          /* Create unique SERVER socket port address, up to 16 per computer */
  328.          part_sock.sin_port = htons (PM_PORT + (mypart() << 4) + i);
  329.          /* If socket is still in use from prev iter, keep trying to bind */
  330.          if ((bind(partition[i].socket, &part_sock, 
  331.                      sizeof(part_sock))) < 0) {
  332.             if ((errno == EADDRINUSE) || (errno == EINTR)) {
  333.                /* Previous load hasn't shutdown yet, or we were interrupted. */
  334.                close(partition[i].socket);
  335.                sleep(2);
  336.             } else {
  337.                fprintf(stderr,"PM %d SERVER: can't bind local addr, 
  338.                        errno=%d\n", mypart(), errno);
  339.                fflush(stderr);
  340.                exit(100);
  341.             }
  342.          } else {
  343.             /* It worked, exit the loop */
  344.             break;
  345.          }
  346.       }
  347.       if (j == NUM_TRIES) {
  348.          /* Exceeded retry limit */
  349.          fprintf(stderr,"PM %d SERVER: can't bind local addr, errno=%d\n",
  350.                  mypart(), errno);
  351.          fflush(stderr);
  352.          exit(100);
  353.       }
  354.  
  355.       /* Issue a listen for the server sockets */
  356.       if (listen(partition[i].socket, 1) < 0) {
  357.          fprintf(stderr,"PM %d SERVER: can't listen on %d, errno = %d\n",
  358.                  mypart(),  partition[i].socket, errno);
  359.          fflush(stderr);
  360.          exit(100);
  361.       }
  362.       /* Set the bit for the socket file descriptor */
  363.       FD_SET(partition[i].socket, &node_part_set);
  364.    } /* end for setting up SERVER sockets */
  365. }
  366. /* pm_server -- SERVER- go into a receiving loop: Copy file desciptors to a 
  367. ** temporary set. Determine how many sockets are ready to be accepted. For 
  368. ** each file descriptor that is ready: Find file descriptor that is ready.
  369. ** If it is found in a partition's array of fd's then it is a base socket and 
  370. ** it is "accept"ed and added to the fd set. Else it is an fd that has data to
  371. ** be received. Receive the size of the message. Loop until entire message is 
  372. ** received. Clear the valid indicator bits. Inform nodes that a message has 
  373. ** arrived. If a broadcast message, set everyone's valid bit, and wait until 
  374. ** everyone receives it. Else verify that message belongs to a node on this 
  375. ** part and set that node's valid bit, wait until it is recvd. */
  376. static void pm_server(void)
  377. {
  378.    int    i, j;
  379.    int    accept_rdy;
  380.    int    newsockfd, templen;
  381.    int    size, count, partial;
  382.    char   *target;
  383.    struct sockaddr_in tempaddr;
  384.    /* forever, accept sockets and receive data */
  385.    for ( ; ; ) {
  386.       temp_set = node_part_set;
  387.       /* Determine how many sockets are ready to be accepted */
  388.       /* FD_SETSIZE is defined in <sys/types.h> to be 200 */
  389.       if ((accept_rdy = select( FD_SETSIZE, &temp_set, 0, 0, 0)) == -1) {
  390.          if (errno != 4) {
  391.             fprintf(stderr, "PM %d SERVER: error in select, errno = %d\n",
  392.                    mypart(), errno);
  393.             perror( "pm select" ) ;
  394.             fflush(stderr);
  395.  
  396.             _killcube(0,0);
  397.             exit(-1);
  398.          } else {
  399.             /* We were interrupted, try again */
  400.             continue;
  401.          }
  402.       }
  403.       for (i = 1; (accept_rdy != 0) && (i < FD_SETSIZE) ; i++) {
  404.          /* Find the file descriptor that needs servicing */
  405.          if ( FD_ISSET( i, &temp_set)) {
  406.             /* temporary modification */
  407.             /* accept_rdy--; */
  408.             accept_rdy = 0;
  409.             /* Examine each partition's array of fd's to find ready one */
  410.             for (j = 0; j < num_parts; j++) {
  411.                /* Skip examining our own partition */
  412.                if (j == mypart() )
  413.                  continue;
  414.                /* Since this matches our "base" socket, accept the socket */
  415.                if (i == partition[j].socket) {
  416.                   newsockfd = accept(partition[j].socket,
  417.                                  (struct sockaddr_in *)&tempaddr, &templen);
  418.                   FD_SET (newsockfd, &node_part_set);
  419.                   /* Found "base" socket, break out of for each part loop */
  420.                   break;
  421.                } /* end if base socket */
  422.             } /* end for check file descriptors in partition's array */
  423.             /* If it wasn't a base socket, then need to receive data */
  424.             if (j != num_parts) {
  425.                continue;
  426.             } else /* receive the data from the socket */ {
  427.                /* First receive the size of the message */
  428.                while (recv(i, &size, sizeof(size)) < 0) {
  429.                   if (errno != 22) {
  430.                     fprintf(stderr, "PM %d SERVER: recv size err, 
  431.                             errno=%d, fd=%d\n", mypart(),errno, i);
  432.                      fflush(stderr);
  433.                      _killcube(0,0);
  434.                      exit(-1);
  435.                   } else {
  436.                      fprintf(stderr, "PM %d SERVER: recv size err, errno=%d, 
  437.                             fd=%d\n", mypart(), errno, i);
  438.                      fflush(stderr);
  439.                   }
  440.                } /* end while recv msg */
  441.  
  442.                target = (char *) &buffer[nodes_in_part];
  443.                count = 0;
  444.                /* Now receive the message, it could come in pieces */
  445.                while (count < size) {
  446.                   if ((partial = recv(i, target, size - count)) < 0) {
  447.                      fprintf(stderr, "PM %d SERVER: Error recvng msg; 
  448.                             errno=%d\n", mypart(),errno);
  449.                      fflush(stderr);
  450.                      exit(-1);
  451.                   }
  452.                   count += partial;
  453.                   target += partial;
  454.                }
  455.                /* Make sure all valid bits are cleared */
  456.                memset(buffer[nodes_in_part].valid,0,
  457.                       sizeof(buffer[nodes_in_part].valid));
  458.                /* Tell the node(s) the message is there */
  459.                if (buffer[nodes_in_part].dnode == -1) {
  460.                   /* Broadcast the message to nodes in this partition */
  461.                   for (j=0; j < nodes_in_part; j++) {
  462.                      buffer[nodes_in_part].valid[j] = 1;
  463.                   }
  464.                   semcall_all(msgavail,nodes_in_part, 1);
  465.                   /* Wait until everyone receives the message */
  466.                   semcall_one(msgfree, nodes_in_part, -nodes_in_part);
  467.                } else {
  468.                  if (mypart() != partof(buffer[nodes_in_part].dnode))
  469.                                       {
  470.                   fprintf(stderr, "PM %d SERVER: Recvd msg for node %d 
  471.                                       not this partition\n",
  472.                              mypart(), buffer[nodes_in_part].dnode);
  473.                      fflush(stderr);
  474.                   } else {
  475.                      /* Point to point to another node in same partition */
  476.                      j = bufferof(buffer[nodes_in_part].dnode);
  477.                      buffer[nodes_in_part].valid[j] = 1;
  478.                      semcall_one(msgavail, j, 1);
  479.                      /* Wait until it is received */
  480.                      semcall_one(msgfree, nodes_in_part, -1);
  481.                   }
  482.                } /* endif broadcast message */
  483.             } /* endif receiving data from this socket */
  484.          } /* endif this socket */
  485.       } /* endfor */
  486.    } /* end forever receive messages on sockets */
  487. }
  488. /* setup_client_sockets -- Setting up CLIENT sockets- for all partitions 
  489. ** except ourself: Create a socket to send data. Look up address of host, 
  490. ** place in the sockaddr_in structure. Determine appropriate PORT id (needs to
  491. ** match with SERVER). (If socket was in use in a prior iteration, it may not 
  492. ** have been reset yet - therefore we loop a fixed number of times retrying.).
  493. ** Issue a connect for the socket */
  494. static void setup_client_sockets(void)
  495. {
  496.    int i, j;
  497.    struct hostent *hent;
  498.    /* Establish socket communications with other partitions */
  499.    for (i = 0; i < num_parts; i++) {
  500.       /* Skip ourself */
  501.       if (i == mypart () )
  502.         continue;
  503.       for (j = 0; j < NUM_TRIES; j++) {
  504.          /* Create a CLIENT socket to send data */
  505.          partition[i].socket = socket(AF_INET, SOCK_STREAM, 0);
  506.          /* Lookup host address and place in the socket address structure */
  507.          memset(&partition[i].addr, 0, sizeof(struct sockaddr_in));
  508.          partition[i].addr.sin_family = AF_INET;
  509.          if ((hent = gethostbyname(partition[i].name)) 
  510.                                == NULL) {
  511.             fprintf(stderr,"PM %d CLIENT: No entry for %d in /etc/hosts\n",
  512.                     mypart(), partition[i].name);
  513.             fflush(stderr);
  514.             exit(100);
  515.          }
  516.          memcpy(&partition[i].addr.sin_addr, hent->h_addr, 
  517.                                 hent->h_length);
  518.          partition[i].addr.sin_port = htons(PM_PORT + (i << 4) + 
  519.                                  mypart());
  520.          /* Connect to the socket */
  521.        if (connect(partition[i].socket, &partition[i].addr,
  522.                      sizeof(struct sockaddr_in)) < 0) {
  523.             if (errno == ECONNREFUSED) {
  524.                /* unsuccessful connect, sleep and try again */
  525.                sleep(3);
  526.             } else {
  527.                /* another error occurred, quit trying to connect */
  528.                j = NUM_TRIES;
  529.                break;
  530.             }
  531.          } else {
  532.             /* successful connect, break out of loop */
  533.             break;
  534.          } /* endif connect */
  535.       } /* endfor NUM_TRIES */
  536.       if (j == NUM_TRIES) {
  537.  
  538.          fprintf(stderr,
  539.                  "PM %d CLIENT: Unable to connect sock to %s, errno %d\n",
  540.                  mypart(), partition[i].name, errno);
  541.          fflush(stderr);
  542.          exit(100);
  543.       }
  544.    } /* end for setting up CLIENT sockets */
  545. }
  546. /* pm_client -- The PM CLIENT process sends data to partitions. Set up client 
  547. ** sockets. Send messages over the sockets: Get message. Send message (if it 
  548. ** is a broadcast message send it to all partitions, if not send it to 
  549. ** appropriate partition). Acknowledge sending of message. Release buffer. 
  550. ** Reset next message indicator. */
  551. static void pm_client(void)
  552. {
  553.    int  i, size;
  554.    /*  CLIENT- GO INTO INFINITE SENDING LOOP */
  555.    /* Initial setting to indicate the next message has not been selected */
  556.    next_message = -1;
  557.    /* Forever, wait for messages to send over socket */
  558.    for ( ; ; ) {
  559.       /* Get the message */
  560.       pm_getmsg();
  561.       /* Determine where to send the message */
  562.       if (buffer[next_message].dnode == -1) {
  563.          /* BROADCAST MESSAGE, SEND TO ALL PARTITIONS */
  564.          for (i = 0; i < numparts(); i++) {
  565.             /* Don't send broadcast to self */
  566.             if (i == mypart () )
  567.                continue;
  568.             /* First send the size of the message */
  569.             size = buffer[next_message].length;
  570.             if (send(partition[i].socket, &size, sizeof(size),0) 
  571.                         < 0) {
  572.                fprintf(stderr,
  573.                        "PM %d CLIENT: send to PM %d failed, errno=%d\n",
  574.  
  575.                        mypart(), i, errno);
  576.                fflush(stderr);
  577.                return -1;
  578.             }
  579.             /* Then send the actual message */
  580.             if (send(partition[i].socket, 
  581.                          &buffer[next_message], size, 0) < 0) {
  582.                fprintf(stderr,
  583.                        "PM %d CLIENT: send to PM %d failed, errno=%d\n",
  584.                        mypart(), i, errno);
  585.                fflush(stderr);
  586.                return -1;
  587.             }
  588.          } /* endfor SEND BROADCAST TO ALL PARTITIONS */
  589.       } else {
  590.          /* SEND TO A SPECIFIC PARTITION */
  591.          /* First send the size of the message */
  592.          size = buffer[next_message].length;
  593.          i = partof(buffer[next_message].dnode);
  594.          if (send(partition[i].socket, &size, sizeof(size),0) 
  595.                           < 0) {
  596.             fprintf(stderr,
  597.                     "PM %d CLIENT: send to PM %d failed, errno=%d\n",
  598.                     mypart(), i, errno);
  599.             fflush(stderr);
  600.             return -1;
  601.          }
  602.          /* Then send the actual message */
  603.          if (send(partition[i].socket, 
  604.                          &buffer[next_message], size, 0) < 0) {
  605.             fprintf(stderr,
  606.                     "PM %d CLIENT: send to PM %d failed, errno=%d\n",
  607.                     mypart(), i, errno);
  608.             return -1;
  609.          }
  610.       }
  611.       /* FOR BOTH BROADCAST AND REGULAR MESSAGES */
  612.       /* acknowledge the sending of the message */
  613.       buffer[next_message].valid[mybuffer()] = 0;
  614.       /* release (free) the buffer */
  615.       semcall_one(msgfree, next_message, 1);
  616.       /* reset next_message so the next getmsg will work */
  617.       next_message = -1;
  618.    } /* end forever CLIENT PROCESS sending messages over socket */
  619. }
  620. /***** Initialization and Termination routines *****/
  621. /* abort_prog -- Clean up in the case of an error */
  622. static void abort_prog(void)
  623. {
  624.    int i;
  625.    /* Remove the sets of semaphores */
  626.    if (pmserver_pid != 0) {
  627.       if (msgavail != -1) {
  628.          semctl(msgavail, 0, IPC_RMID, 0);
  629.          msgavail = -1;
  630.       }
  631.       if (msgfree != -1) {
  632.          semctl(msgfree, 0, IPC_RMID, 0);
  633.          msgfree = -1;
  634.       }
  635.    }
  636.    /* Remove the shared memory */
  637.    if (buffer != NULL) {
  638.       shmdt(buffer);
  639.       buffer = NULL;
  640.    }
  641.    /* Only PM SERVER process should execute this code */
  642.    if (pmserver_pid != 0) {
  643.       if (shmid_m != -1) {
  644.          shmctl(shmid_m, IPC_RMID, 0);
  645.          shmid_m = -1;
  646.       }
  647.    }
  648.    /* Close the sockets */
  649.    for (i = 0; i < num_parts; i++) {
  650.       if (i != mypart() ) {
  651.          close (partition[i].socket);
  652.          partition[i].socket = 0;
  653.       }
  654.    }
  655.    /* Make sure all pending output gets out */
  656.    fflush(stdout);
  657.    fflush(stderr);
  658. }
  659. /* Handle termination signals */
  660. void sig_terminate(int sig, int code, struct sigcontext *scp)
  661. {
  662.    int i;
  663.    /* Send termination signal to each of PM SERVER's children */
  664.  
  665.    if (pmserver_pid != 0) {
  666.       for (i = 0; i < child_index; i++) {
  667.          kill(children[i], SIGTERM);
  668.       }
  669.       child_index = 0;
  670.       kill(pmserver_pid, SIGTERM);
  671.    }
  672.    /* Clean up the use of semaphores and shared memory */
  673.    abort_prog();
  674.    exit(100);
  675. }
  676. /* Handle unexpected termination signals */
  677. void unexpected_death(int sig, int code, struct sigcontext *scp)
  678. {
  679.   int statval;
  680.   int waitpid;
  681.   /* Only PM SERVER process should execute this code */
  682.   if (pmserver_pid != 0) {
  683.      waitpid = wait(&statval);
  684.      if (waitpid < 0) {
  685.         printf("Error determining who died unexpectedly. Errno=%d\n", errno);
  686.      } else {
  687.         if (WIFSIGNALED(statval) != 0) {
  688.            printf("Process %d did not catch signal %d.\n",
  689.                   waitpid, WTERMSIG(statval));
  690.         } else if (WIFSTOPPED(statval) != 0) {
  691.            printf("Process %d stopped due to signal %d.\n",
  692.                   waitpid, WSTOPSIG(statval));
  693.         } else if (WIFEXITED(statval) == 0) {
  694.            /* Normal termination */
  695.         } else {
  696.            /* Terminated with exit code */
  697.         }
  698.      }
  699.   }
  700.   fflush(stdout);
  701. }
  702. /* killcube -- On abort, kill off all children on the hypercube partition */
  703. int _killcube(int node, int pid)
  704. {
  705.   int i;
  706.   int statval;
  707.   int waitpid;
  708.  
  709.   /* Only PM SERVER process should execute this code */
  710.   if (pmserver_pid != 0) {
  711.      for (i = 0; i < child_index; i++) {
  712.         kill(children[i], SIGTERM);
  713.      }
  714.      kill(pmserver_pid, SIGTERM);
  715.      for (i = 0; i <= child_index; i++) {
  716.         waitpid = wait(&statval);
  717.         if (waitpid < 0) {
  718.            /* No more children left */
  719.            break;
  720.         } else {
  721.            if (WIFSIGNALED(statval) != 0) {
  722.               printf("Process %d did not catch signal %d.\n",
  723.                      waitpid, WTERMSIG(statval));
  724.            } else if (WIFSTOPPED(statval) != 0) {
  725.               printf("Process %d stopped due to signal %d.\n",
  726.                      waitpid, WSTOPSIG(statval));
  727.            } else if (WIFEXITED(statval) == 0) {
  728.               /* Normal termination */
  729.            } else {
  730.               /* Terminated with exit code */
  731.            }
  732.         }
  733.      }
  734.   }
  735.   /* Clean up after ourself */
  736.   abort_prog();
  737.   child_index = 0;
  738.   return 0;
  739. }
  740. /* init_shared_mem -- Allocates a shared memory region. Sets pointer to region
  741. ** in this process's memory space and returns the shared memory identifier. */
  742. static int init_shared_mem(void **pointer, int size, int key)
  743. {
  744.   int shmid;
  745.   if ((shmid = shmget(key, size, 0666 | IPC_CREAT)) < 0) {
  746.     printf("init_shm: allocation of shared memory failed. Errno=%d\n",errno);
  747.     printf("      mynode=%d key=%d size=%d\n",my_node,key,size);
  748.     _killcube(0,0);
  749.     exit(-1);
  750.   }
  751.   *pointer = shmat(shmid, NULL, 0);
  752.   return shmid;
  753. }
  754.  
  755. /* init_semaphore -- Allocates a set of semaphores and initializes them */
  756. static int init_semaphore(int *semid, int size, int value, int key)
  757. {
  758.    register int i;
  759.    if ((*semid = semget(key, size, 0666 | IPC_CREAT)) < 0) {
  760.      printf("init_sem: allocation of semaphores failed. Errno=%d\n",errno);
  761.      printf("      mynode=%d key=%d size=%d\n",my_node,key,size);
  762.      _killcube(0,0);
  763.      exit(-1);
  764.    }
  765.    for (i = 0; i < size; i++) {
  766.       if (semctl(*semid, i, SETVAL, value) < 0) {
  767.         printf("init_sem: init of semaphores failed. Errno=%d\n",errno);
  768.         printf("      mynode=%d offset=%d value=%d\n",my_node,i,value);
  769.         _killcube(0,0);
  770.         exit(-1);
  771.       }
  772.    }
  773.    return *semid;
  774. }
  775. /* semcall_all --Perform same operation on all elements of semaphore at once.*/
  776. static int semcall_all(int semid, int size, int operation)
  777. {
  778.    struct sembuf sbuf[NUMBER_IN_PART+1];
  779.    register int i;
  780.    for (i = 0; i < size; i++) {
  781.       sbuf[i].sem_num = i;
  782.       sbuf[i].sem_op = operation;
  783.       sbuf[i].sem_flg = 0;
  784.    }
  785.    while (semop(semid, sbuf, size) < 0) {
  786.       /* repeat operation if interrupted */
  787.       if (errno != EINTR) {
  788.         printf("PM %d: Semaphore broadcast failed. Errno = %d\n",
  789.                mypart(), errno);
  790.         fflush(stdout);
  791.         return -1;
  792.       }
  793.    }
  794.    return 0;
  795. }
  796. /* semcall_one -- Perform an operation on an element of a semaphore. */
  797. static int semcall_one(int semid, int num, int operation)
  798. {
  799.    struct sembuf sbuf;
  800.    sbuf.sem_num = num;
  801.    sbuf.sem_op = operation;
  802.    sbuf.sem_flg = 0;
  803.    while (semop(semid, &sbuf, 1) < 0) {
  804.       /* repeat operation if interrupted */
  805.       if (errno != EINTR) {
  806.         printf("PM %d: Semaphore failed. Errno = %d\n", mypart(), errno);
  807.         fflush(stdout);
  808.         return -1;
  809.       }
  810.    }
  811.    return 0;
  812. }
  813. /***** Environment Information (External and Internal) *****/
  814. /* numnodes -- Returns the number of simulated nodes */
  815. int numnodes(void)
  816. {
  817.   return num_nodes;
  818. }
  819. /* numparts -- number of partitions */
  820. static int numparts(void)
  821. {
  822.   return num_parts;
  823. }
  824. /* mypart -- Partition this process is in */
  825. static int mypart(void)
  826. {
  827.   return my_part;
  828. }
  829.  
  830. /* partof -- Determines which partition a given node is a member of */
  831. static int partof(int n)
  832. {
  833.   if (n == myhost()) {
  834.     return 0;
  835.   } else {
  836.     return n / NUMBER_IN_PART;
  837.   }
  838. }
  839. /* pm_partof -- Determines which subpartition a given node is a member of
  840. ** A -1 can be passed if a destination node is broadcast, return -1. */
  841. static int pm_partof(int n)
  842. {
  843.   if (n == myhost()) {
  844.     return 0;
  845.   } else if (n == -1) {
  846.     return -1;
  847.   } else {
  848.     return n / NUMBER_IN_PART;
  849.   }
  850. }
  851. /* numbuffers -- Number of buffers in this partition */
  852. static int numbuffers(void)
  853. {
  854.   if (mypart() == 0) {
  855.     return (nodes_in_part + 2);
  856.   } else {
  857.     return (nodes_in_part + 1);
  858.   }
  859. }
  860. /* mybuffer -- returns the index for this process's buffer */
  861. static int mybuffer(void)
  862. {
  863.   return (nodes_in_part);
  864. }
  865.  
  866. /* bufferof -- Returns the buffer offset of the given node. Host is always 
  867. ** second to last buffer in partition 0. The PM is always the last buffer */
  868. static int bufferof(int n)
  869. {
  870.    if (mypart() != partof(n)) {
  871.       return nodes_in_part;             /* Return the buffer of PM */
  872.    } else if (n == myhost()) {
  873.       return nodes_in_part + 1;         /* This partition, buffer of host */
  874.    } else {
  875.       return n % NUMBER_IN_PART;        /* This partition, buffer of node */
  876.    }
  877. }
  878. /* mynode -- Returns the node number for this process */
  879. int mynode(void)
  880. {
  881.   return my_node;
  882. }
  883. /* myhost -- Returns the node number of the host */
  884. int myhost(void)
  885. {
  886.   return numnodes();
  887. }
  888. /***** Communications *****/
  889. /* pm_getmsg -- Wait until a message is available. This routine differs from 
  890. ** getmsg, in that it checks to ensure that destination node is not in this 
  891. ** partition. (Getmsg checks that current node equals destination node.) 
  892. ** OUTPUT: next_message - set to the message found of the proper type */
  893. static void pm_getmsg(void)
  894.  
  895. {
  896.    int i;
  897.    /* Only wait if a message is not already selected */
  898.    if (next_message != -1) return;
  899.    /* Wait for a message for me */
  900.    semcall_one(msgavail, mybuffer(), -1);
  901.    /* Search for those messages that are for me */
  902.    for (i = 0; i < numbuffers(); i++) {
  903.       if (buffer[i].valid[mybuffer()] != 0) {
  904.          next_message = i;
  905.          return;
  906.       }
  907.    }
  908. }
  909.  
  910.  
  911. [LISTING THREE]
  912.  
  913.  
  914. /*****   simulate.c  *****/
  915. /* These functions allow a UNIX system simulate a hypercube environment. */
  916. #include <stdio.h>
  917. #include <ctype.h>
  918. #include <sys/types.h>
  919. #include <errno.h>
  920. #include <sys/ipc.h>
  921. #include <sys/sem.h>
  922. #include <sys/shm.h>
  923. #include <sys/signal.h>
  924. #include <sys/wait.h>
  925. #include <sys/socket.h>
  926. #include <sys/in.h>
  927. #include <netdb.h>
  928. #include "cube.h"
  929.  
  930. /* Prototypes */
  931. char *getenv(char *variable);
  932. void *shmat(int shmid, void *shmaddr, int shmflg);
  933. char *strtok(char *, char *);
  934. char *strcpy(char *, char *);
  935. void *malloc(int size);
  936. #define min(x,y) (((x) < (y)) ? (x) : (y))
  937. int csend(int type, void *msg, int length, int target_node, int group);
  938. int crecv(int type, void *buf, int len);
  939. int killcube(int, int);
  940. int numnodes(void);
  941. int myhost(void);
  942. int mynode(void);
  943. int numparts(void);
  944. int numbuffers(void);
  945. int mybuffer(void);
  946. int bufferof(int node);
  947. int mypart(void);
  948. int partof(int node);
  949.  
  950. /* Local, Private Information */
  951. static int num_parts;         /* number of partitions */
  952. static int my_part;           /* partition this process is in */
  953. static int nodes_in_part;     /* number of nodes in this partition */
  954. static subpart *partition = NULL; /* list of partition information */
  955. static int base;              /* base key value for allocating shared data */
  956. static int my_node;           /* node number for this process */
  957. static int my_group;          /* group id for this process */
  958.                               /* There are two groups, host communications */
  959.                               /* and inter-node communications */
  960. static int num_nodes;         /* total number of nodes in all partitions */
  961. static int msgavail = -1;     /* semaphores indicating message is available */
  962. static int msgfree = -1;      /* semaphores indicating buffer is free */
  963. static int next_message;      /* which message is to be received next */
  964. static int shmid_m = -1;      /* id of shared area for messages */
  965. static message *buffer = NULL;/* communication areas */
  966. static int *children = NULL;  /* process ids of all child processes */
  967. static int child_index = 0;   /* number of children created */
  968.  
  969. / ** Initialization and Termination routines ** /
  970. /* abort_prog -- Clean up when the program terminates */
  971. void abort_prog(void)
  972. {
  973.    /* Remove the sets of semaphores */
  974.    if (mynode() == myhost()) {
  975.       if (msgavail != -1) {
  976.          semctl(msgavail, 0, IPC_RMID, 0);
  977.          msgavail = -1;
  978.       }
  979.       if (msgfree != -1) {
  980.          semctl(msgfree, 0, IPC_RMID, 0);
  981.          msgfree = -1;
  982.       }
  983.     }
  984.    /* Remove the shared memory */
  985.    if (buffer != NULL) {
  986.       shmdt(buffer);
  987.       buffer = NULL;
  988.    }
  989.    if (mynode() == myhost()) {
  990.       if (shmid_m != -1) {
  991.          shmctl(shmid_m, IPC_RMID, 0);
  992.          shmid_m = -1;
  993.       }
  994.    }
  995.    /* Make sure all pending output gets out */
  996.    fflush(stdout);
  997.    fflush(stderr);
  998. }
  999. /* Handle termination signals */
  1000. void sig_terminate(int sig, int code, struct sigcontext *scp)
  1001. {
  1002.   if (mynode() == myhost()) {
  1003.     /* Pass on the termination signal to the node processes */
  1004.     killcube(0,0);
  1005.   } else {
  1006.     /* This is executed by the node processes */
  1007.     /* Clean up the use of semaphores and shared memory */
  1008.     abort_prog();
  1009.   }
  1010.   exit(100);
  1011. }
  1012. /* Handle unexpected termination signals. Used by the host process. */
  1013. void unexpected_death(int sig, int code, struct sigcontext *scp)
  1014. {
  1015.   int statval;
  1016.   int waitpid;
  1017.   waitpid = wait(&statval);
  1018.   if (waitpid < 0) {
  1019.     printf("Error determining who died unexpectedly. Errno=%d\n", errno);
  1020.   } else {
  1021.     if (WIFSIGNALED(statval) != 0) {
  1022.       printf("Process %d did not catch signal %d.\n",
  1023.              waitpid, WTERMSIG(statval));
  1024.     } else if (WIFSTOPPED(statval) != 0) {
  1025.       printf("Process %d stopped due to signal %d.\n", 
  1026.              waitpid, WSTOPSIG(statval));
  1027.     } else if (WIFEXITED(statval) == 0) {
  1028.       /* Normal termination */
  1029.     } else {
  1030.       /* Terminated with exit code */
  1031.     }
  1032.   }
  1033.   fflush(stdout);
  1034. }
  1035. /* handler -- handles hypercube specific errors that do not map to UNIX. */
  1036. void handler(int type, void (*proc)())
  1037. {
  1038.   /* ignore this */
  1039. }
  1040. /* getcube -- Called by host process to gain possession of a partition in a 
  1041. ** hypercube. Note: Assuming getcube is only called once per host process. */
  1042. void getcube(char *cubename, char *cubetype, char *srmname, int keep,
  1043.               char *account)
  1044. {
  1045.   char size[8];
  1046.   int is_dimension = 0;
  1047.   int i;
  1048.   char *ptr;
  1049.   char *target;
  1050.   /* Pull out the requested number of nodes */
  1051.   ptr = cubetype;
  1052.   if (*ptr == 'd') {
  1053.     ptr++;
  1054.     is_dimension = 1;
  1055.   }
  1056.   target = size;
  1057.   i = 4;
  1058.   while (isdigit(*ptr) && (i-- != 0)) {
  1059.     *target++ = *ptr++;
  1060.   }
  1061.  
  1062.   *target = '\0';
  1063.   /* The rest of the parameters don't matter */
  1064.   /* Determine the total number of nodes */
  1065.   num_nodes = NUMBER_IN_PART; /* default size */
  1066.   sscanf(size,"%d",&num_nodes);
  1067.   if (is_dimension) {
  1068.     num_nodes = 1 << num_nodes;
  1069.   }
  1070. }
  1071. /* cubeinfo -- Passes back information about the partitions on a hypercube.
  1072. ** Input: global=0 current attached cube; 1. all cubes you own and allocated 
  1073. ** by the current host; 2. all cubes on the system from which the command was 
  1074. ** executed; 3. how cubes are allocated on all SRMs; 4. 1 addition parameter 
  1075. ** (srmname) returns info for that SRM */
  1076. int cubeinfo(struct cubetable *ct, int numslots, int global, ...)
  1077. {
  1078.   /* returns the number of cubes for which information is available */
  1079.   /* Ignore this for now */
  1080.   return 0;
  1081. }
  1082. /* relcube -- release cube gained by the getcube call. */
  1083. void relcube(char *cubename)
  1084. {
  1085.    /* Ignore this for now */
  1086. }
  1087. /* killcube -- On abort, kill off all processes in the hypercube partition */
  1088. int killcube(int node, int pid)
  1089. {
  1090.   int i;
  1091.   int statval;
  1092.   int waitpid;
  1093.   /* Force everyone to terminate */
  1094.   for (i = 0; i < child_index; i++) {
  1095.      kill(children[i], SIGTERM);
  1096.   }
  1097.  
  1098.   /* Give the children a chance to terminate */
  1099.   if (child_index > 0) sleep(1);
  1100.   /* Wait for everyone to exit, check status in case */
  1101.   for (i = 0; i < child_index; i++) {
  1102.     waitpid = wait(&statval);
  1103.     if (waitpid < 0) {
  1104.       /* No more children left */
  1105.       break;
  1106.     } else {
  1107.       if (WIFSIGNALED(statval) != 0) {
  1108.         printf("Process %d did not catch signal %d.\n",
  1109.                waitpid, WTERMSIG(statval));
  1110.       } else if (WIFSTOPPED(statval) != 0) {
  1111.         printf("Process %d stopped due to signal %d.\n",
  1112.                waitpid, WSTOPSIG(statval));
  1113.       } else if (WIFEXITED(statval) == 0) {
  1114.         /* Normal termination */
  1115.       } else {
  1116.          /* Terminated with exit code */
  1117.       }
  1118.     }
  1119.   }
  1120.   /* Clean up after ourself */
  1121.   abort_prog();
  1122.   child_index = 0;
  1123.   return 0;
  1124. }
  1125. /* init_shared_mem -- Allocates a shared memory region. Sets pointer to region
  1126. ** in this process's memory space and returns the shared memory identifier. */
  1127. static int init_shared_mem(void **pointer, int size, int key)
  1128. {
  1129.   int shmid;
  1130.   if ((shmid = shmget(key, size, 0666 | IPC_CREAT)) < 0) {
  1131.     printf("init: allocation of shared memory failed. Errno=%d\n",errno);
  1132.     printf("      mynode=%d key=%d size=%d\n",my_node,key,size);
  1133.     fflush(stdout);
  1134.     sig_terminate(0,0,NULL);
  1135.   }
  1136.   *pointer = shmat(shmid, NULL, 0);
  1137.   return shmid;
  1138. }
  1139. /* init_semaphore -- Allocates a set of semaphores and initializes them */
  1140. static int init_semaphore(int *semid, int size, int value, int key)
  1141. {
  1142.    register int i;
  1143.    if ((*semid = semget(key, size, 0666 | IPC_CREAT)) < 0) {
  1144.      printf("init: allocation of semaphores failed. Errno=%d\n",errno);
  1145.      printf("      mynode=%d key=%d size=%d\n",my_node,key,size);
  1146.      fflush(stdout);
  1147.      sig_terminate(0,0,NULL);
  1148.    }
  1149.    for (i = 0; i < size; i++) {
  1150.       if (semctl(*semid, i, SETVAL, value) < 0) {
  1151.         printf("init: initialization of semaphores failed. Errno=%d\n",errno);
  1152.         printf("      mynode=%d offset=%d value=%d\n",my_node,i,value);
  1153.         fflush(stdout);
  1154.         sig_terminate(0,0,NULL);
  1155.       }
  1156.    }
  1157.    return *semid;
  1158. }
  1159. /* semcall_all -- Perform same operation on all elements of a semaphore. */
  1160. static int semcall_all(int semid, int size, int operation)
  1161. {
  1162.    struct sembuf sbuf[NUMBER_IN_PART+1];
  1163.    register int i;
  1164.    for (i = 0; i < size; i++) {
  1165.       sbuf[i].sem_num = i;
  1166.       sbuf[i].sem_op = operation;
  1167.       sbuf[i].sem_flg = 0;
  1168.    }
  1169.    while (semop(semid, sbuf, size) < 0) {
  1170.       /* repeat operation if interrupted */
  1171.       if (errno != EINTR) {
  1172.         printf("%d: Semaphore broadcast failed. Errno = %d\n",mynode(),errno);
  1173.         abort_prog();
  1174.         exit(-1);
  1175.       }
  1176.    }
  1177.    return 0;
  1178. }
  1179. /* semcall_one -- Perform an operation on an element of a semaphore. */
  1180. static int semcall_one(int semid, int num, int operation)
  1181. {
  1182.    struct sembuf sbuf;
  1183.    sbuf.sem_num = num;
  1184.    sbuf.sem_op = operation;
  1185.    sbuf.sem_flg = 0;
  1186.    while (semop(semid, &sbuf, 1) < 0) {
  1187.       /* repeat operation if interrupted */
  1188.       if (errno != EINTR) {
  1189.         printf("%d: Semaphore failed. Errno = %d\n",mynode(), errno);
  1190.         abort_prog();
  1191.         exit(-1);
  1192.       }
  1193.    }
  1194.    return 0;
  1195. }
  1196. /* setpid -- Assigns a partition identifier to the simulated partition. */
  1197. int setpid(int id)
  1198. {
  1199.    my_group = id;
  1200.    return 0;
  1201. }
  1202. /* init_simulator -- Should be called near the beginning of an application
  1203. ** before any hypercube-related functions are called. */
  1204. void init_simulator(void)
  1205. {
  1206.    register int i, pid;
  1207.    char filename[20];
  1208.    char *temp;
  1209.    static char env[256];        /* must be static */
  1210.    struct hostent *hent;
  1211.    /* parent cm will send child cm SIGINT when a CTRL-BREAK is pressed */
  1212.    signal(SIGINT,sig_terminate);
  1213.    signal(SIGTERM,sig_terminate);
  1214.    signal(SIGQUIT,sig_terminate);
  1215.    /* Pick up the base key value from the environment */
  1216.    if ((temp = getenv("SIM_INFO")) == NULL) {
  1217.      fprintf(stderr,"init_sim: Missing environment variable\n");
  1218.      fflush(stderr);
  1219.      exit(-1);
  1220.    }
  1221.  
  1222.    strcpy(env,temp);
  1223.    if ((temp = strtok(env,",")) == NULL) {
  1224.     fprintf(stderr, "init_sim: Missing information in environment variable\n");
  1225.     fflush(stderr);
  1226.     exit(-1);
  1227.    }
  1228.    sscanf(temp,"%d",&base);
  1229.    if ((temp = strtok(NULL,",")) == NULL) {
  1230.      fprintf(stderr,"init_sim: Missing node info in environment variable\n");
  1231.      fflush(stderr);
  1232.      exit(-1);
  1233.    }
  1234.    sscanf(temp,"%d",&my_node);
  1235.    if ((temp = strtok(NULL,",")) == NULL) {
  1236.      fprintf(stderr,"init_sim: Missing pid info in environment variable\n");
  1237.      fflush(stderr);
  1238.      exit(-1);
  1239.    }
  1240.    sscanf(temp,"%d",&my_group);
  1241.    if ((temp = strtok(NULL,",")) == NULL) {
  1242.      fprintf(stderr,"init_sim: Missing number of node info in environment 
  1243.                                                                   variable\n");
  1244.      fflush(stderr);
  1245.      exit(-1);
  1246.    }
  1247.    sscanf(temp,"%d",&num_nodes);
  1248.    num_parts = (num_nodes + NUMBER_IN_PART - 1) / NUMBER_IN_PART;
  1249.    my_part = my_node / NUMBER_IN_PART;
  1250.    /* Calcuate the number of nodes in this and remaining partitions */
  1251.    i = numnodes() - (mypart() * NUMBER_IN_PART);
  1252.    nodes_in_part = min(NUMBER_IN_PART, i);
  1253.    /* Allocate shared memory */
  1254.    shmid_m = init_shared_mem(&buffer, sizeof(message) * numbuffers(), base);
  1255.    /* Allocate communications semaphores */
  1256.    init_semaphore(&msgavail, numbuffers(), 0, base+10000);
  1257.    init_semaphore(&msgfree, numbuffers(), 0, base+20000);
  1258. }
  1259. /* load -- Should be called near the beginning of a host application before any
  1260. ** hypercube-related functions are called, except for getcube. It will start
  1261. ** the appropriate number of PMs on the appropriate systems (as read from the
  1262. ** .pmrc file.) Parent process will be node 0, which has special roles on a 
  1263. ** hypercube. Remaining processes will be numbered consecutively. */
  1264. int load(char *filename, int which_node, int group_id)
  1265. {
  1266.    register int i, j, pid, size;
  1267.    char *argv[20];
  1268.  
  1269.    char base_string[20];
  1270.    char partition_number[20];
  1271.    char group_string[20];
  1272.    char number_of_nodes[20];
  1273.    char temp[256];
  1274.    char *ptr;
  1275.    struct servent *sp;
  1276.    FILE *fd;
  1277.    /* Allocate space for child pids */
  1278.    if (children == NULL) {
  1279.       if ((children = malloc(numnodes() * sizeof(int))) == NULL) {
  1280.          fprintf(stderr,"load: insufficient memory\n");
  1281.          fflush(stderr);
  1282.          return -1;
  1283.       }
  1284.    }
  1285.    /* parent will send us SIGINT when CTRL-BREAK is pressed */
  1286.    signal(SIGINT,sig_terminate);
  1287.    signal(SIGTERM,sig_terminate);
  1288.    signal(SIGQUIT,sig_terminate);
  1289.    signal(SIGCHLD, unexpected_death);
  1290.    base = getpid();
  1291.    num_parts = (num_nodes + NUMBER_IN_PART - 1) / NUMBER_IN_PART;
  1292.    if (partition == NULL) {
  1293.       if ((partition = malloc(num_parts * sizeof(subpart))) == NULL) {
  1294.          fprintf(stderr,"load: insufficient memory\n");
  1295.          fflush(stderr);
  1296.          return -1;
  1297.       }
  1298.       memset(partition, 0, num_parts * sizeof(subpart));
  1299.       if ((fd = fopen(".pmrc","r")) == NULL) {
  1300.          fprintf(stderr,"load: Missing configuration file \".pmrc\"\n");
  1301.          fflush(stderr);
  1302.          return -1;
  1303.       }
  1304.       for (i = 0; i < num_parts; i++) {
  1305.          temp[0] = '\0';
  1306.          fscanf(fd," %[^ \n] \n",temp);
  1307.          size = strlen(temp);
  1308.          if ((ptr = malloc(size+1)) == NULL) {
  1309.             fprintf(stderr,"load: Insufficent memory\n");
  1310.             fflush(stderr);
  1311.             return -1;
  1312.          }
  1313.          strcpy(ptr,temp);
  1314.          partition[i].name = ptr;
  1315.       }
  1316.       fclose(fd);
  1317.    }
  1318.  
  1319.    /* Host program's node number is the same as the number of nodes */
  1320.    my_node = numnodes();
  1321.    my_part = 0;
  1322.    /* Calcuate the number of nodes in this and remaining partitions */
  1323.    i = numnodes() - (mypart() * NUMBER_IN_PART);
  1324.    nodes_in_part = min(NUMBER_IN_PART, i);
  1325.    /* Allocate shared memory */
  1326.    if (shmid_m == -1) {
  1327.       shmid_m = init_shared_mem(&buffer, sizeof(message) * numbuffers(),base);
  1328.    }
  1329.    memset(buffer,0,sizeof(message) * numbuffers());
  1330.    /* Allocate communications semaphores */
  1331.    if (msgavail == -1) {
  1332.       init_semaphore(&msgavail, numbuffers(), 0, base+10000);
  1333.    }
  1334.    if (msgfree == -1) {
  1335.       init_semaphore(&msgfree, numbuffers(), 0, base+20000);
  1336.    }
  1337.    /* Split into node processes */
  1338.    fflush(stdout);
  1339.    fflush(stderr);
  1340.    /* Start the local and remote Partition Managers */
  1341.    for (i = 0; i < num_parts; i++) {
  1342.      if ((pid = fork()) < 0) {
  1343.        /* Can't create all the children! */
  1344.        killcube(0,0);
  1345.        fprintf(stderr, "LOAD: unable to create Partition Managers\n");
  1346.        return -1;
  1347.      } else if (pid == 0) {
  1348.         /* I'm the child process */
  1349.         my_node = -1;
  1350.         /* Start the Partition Managers */
  1351.         if (i == 0) {
  1352.            argv[0] = "pm";
  1353.            argv[1] = filename;
  1354.            sprintf(partition_number, "%d", i);
  1355.            argv[2] = partition_number;
  1356.            sprintf(base_string,"%d", base);
  1357.            argv[3] = base_string;
  1358.            sprintf(group_string, "%d", group_id);
  1359.            argv[4] = group_string;
  1360.            sprintf(number_of_nodes, "%d", numnodes());
  1361.            argv[5] = number_of_nodes;
  1362.            for (i = 0; i < num_parts; i++) {
  1363.               argv[i+6] = partition[i].name;
  1364.            }
  1365.            argv[i+6] = NULL;
  1366.            execvp("pm",argv);
  1367.            /* If we get here, we had a problem */
  1368.            printf("execvp of PM 0 failed. errno=%d\n",errno);
  1369.            fflush(stdout);
  1370.            exit(-1);
  1371.         } else {
  1372.            argv[0] = "rsh";
  1373.            argv[1] = partition[i].name;
  1374.            argv[2] = "pm";
  1375.            argv[3] = filename;
  1376.            sprintf(partition_number, "%d", i);
  1377.            argv[4] = partition_number;
  1378.            sprintf(base_string,"%d", base);
  1379.            argv[5] = base_string;
  1380.            sprintf(group_string, "%d", group_id);
  1381.            argv[6] = group_string;
  1382.            sprintf(number_of_nodes, "%d", numnodes());
  1383.            argv[7] = number_of_nodes;
  1384.            for (i = 0; i < num_parts; i++) {
  1385.               argv[i+8] = partition[i].name;
  1386.            }
  1387.            argv[i+8] = NULL;
  1388.            execvp("rsh",argv);
  1389.            /* If we get here, we had a problem */
  1390.            printf("execvp of PM 0 failed. errno=%d\n",errno);
  1391.            fflush(stdout);
  1392.            exit(-1);
  1393.         }
  1394.      } else {
  1395.        /* I'm the parent process */
  1396.        children[child_index++] = pid;
  1397.      }
  1398.    }
  1399. }
  1400. /** Environment Information (External and Internal) **/
  1401. /* availmem -- returns amount of memory available */
  1402. int availmem(void)
  1403. {
  1404.   return 0;
  1405. }
  1406. /* nodedim -- Returns the dimension of the simulated hypercube */
  1407. int nodedim(void)
  1408. {
  1409.   unsigned int i, temp;
  1410.   temp = num_nodes;
  1411.   i = 0;
  1412.   while (temp != 0) {
  1413.     temp >> 1;
  1414.     i++;
  1415.   }
  1416.   return i;
  1417. }
  1418. /* numnodes -- Returns the number of simulated nodes */
  1419. int numnodes(void)
  1420. {
  1421.   return num_nodes;
  1422. }
  1423. /* numparts -- number of simulator partitions */
  1424. static int numparts(void)
  1425. {
  1426.   return num_parts;
  1427. }
  1428. /* mypart --  Simulator partition this process is in */
  1429. static int mypart(void)
  1430. {
  1431.   return my_part;
  1432. }
  1433. /* partof -- Determines which simulator partition a given node is member of */
  1434. static int partof(int n)
  1435. {
  1436.   if (n == myhost()) {
  1437.     return 0;
  1438.   } else {
  1439.  
  1440.     return n / NUMBER_IN_PART;
  1441.   }
  1442. }
  1443. /* numbuffers -- Number of buffers in this simulator partition */
  1444. static int numbuffers(void)
  1445. {
  1446.   if (mypart() == 0) {
  1447.     return nodes&us.in&us.part + 2;
  1448.   } else {
  1449.     return nodes&us.in&us.part + 1;
  1450.   }
  1451. }
  1452. /* mybuffer -- returns the index for this process's buffer */
  1453. static int mybuffer(void)
  1454. {
  1455.   if (mynode() == myhost()) {
  1456.     return nodes&us.in&us.part+1;
  1457.   } else {
  1458.     return mynode() % NUMBER_IN_PART;
  1459.   }
  1460. }
  1461. /* bufferof -- Returns the buffer offset of the given node. The host is always 
  1462. ** the last buffer in partition 0. The PM is always second to last buffer */
  1463. static int bufferof(int n)
  1464. {
  1465.    if (mypart() != partof(n)) {
  1466.       return nodes_in_part;             /* Return the buffer of PM */
  1467.    } else if (n == myhost()) {
  1468.       return nodes_in_part + 1;         /* This partition, buffer of host */
  1469.    } else {
  1470.       return n % NUMBER_IN_PART;        /* This partition, buffer of node */
  1471.    }
  1472. }
  1473. /* mynode -- Returns the node number for this process */
  1474.  
  1475. int mynode(void)
  1476. {
  1477.   return my_node;
  1478. }
  1479. /* mypid -- Returns the group number */
  1480. int mypid(void)
  1481. {
  1482.   return my_group;
  1483. }
  1484. /* myhost -- Returns the node number of the host */
  1485. int myhost(void)
  1486. {
  1487.   return numnodes();
  1488. }
  1489. /** Communications **/
  1490. /* cread -- Special read for files on hypercube's high-speed disk system. We 
  1491. just issue a standard read instead. */
  1492. int cread(int fd, void *buffer, int size)
  1493. {
  1494.   return read(fd, buffer, size);
  1495. }
  1496. /* gdsum -- Sum individual elements of an array on all processes */
  1497. void gdsum(double x[], long elements,  double work[])
  1498. {
  1499.   register int i,j;
  1500.   double temp;
  1501.   if ((mybuffer()) == 0) {
  1502.      /* The first node in each partition sums the local data */
  1503.      if (nodes_in_part > 1) {
  1504.         /* Only sum when we aren't the only ones in the partition */
  1505.         for (i = 1; i < nodes_in_part; i++) {
  1506.  
  1507.            /* Get the next set of numbers to sum */
  1508.            crecv(-2, work, elements * sizeof(double));
  1509.            for (j = 0; j < elements; j++) {
  1510.               x[j] += work[j];
  1511.            }
  1512.         }
  1513.      }
  1514.      /* Node 0 sums for all partitions */
  1515.      if (mynode() == 0) {
  1516.         /* Only sum if there are more than one partition */
  1517.         if (numparts() > 1) {
  1518.            for (i = 1; i < numparts(); i++) {
  1519.               /* Get the next set of numbers to sum */
  1520.               crecv(-3, work, elements * sizeof(double));
  1521.  
  1522.               for (j = 0; j < elements; j++) {
  1523.                  x[j] += work[j];
  1524.               }
  1525.            }
  1526.         }
  1527.         /* Only broadcast if there is more than one node */
  1528.         if (nodes_in_part > 1) {
  1529.            /* Broadcast the results */
  1530.            csend(-4,x,elements * sizeof(double),-1,mypid());
  1531.         }
  1532.      } else {
  1533.         /* Each partition needs to send the partial sum to node 0 */
  1534.         csend(-3,x,elements * sizeof(double),0,mypid());
  1535.         /* Wait for the answer */
  1536.         crecv(-4,x,elements * sizeof(double));
  1537.      }
  1538.   } else {
  1539.     /* Send the data to local node to do the summation */
  1540.     csend(-2,x,elements * sizeof(double),mypart()*4,mypid());
  1541.     /* Wait for the answer */
  1542.     crecv(-4,x,elements * sizeof(double));
  1543.   }
  1544. }
  1545. /* getmsg -- Wait until a message is available. OUTPUT: next_message, set to 
  1546. ** the message found of the proper type */
  1547. static void getmsg(void)
  1548. {
  1549.    int i;
  1550.    /* Only wait if a message is not already selected */
  1551.    if (next_message != -1) return;
  1552.  
  1553.    /* Wait for a message for me */
  1554.    semcall_one(msgavail, mybuffer(), -1);
  1555.    /* Search for those messages that are for me */
  1556.    for (i = 0; i < numbuffers(); i++) {
  1557.      if (buffer[i].valid[mybuffer()] == 1) {
  1558.        if ((buffer[i].dnode == mynode()) || (buffer[i].dnode == -1)) {
  1559.          next_message = i;
  1560.          return;
  1561.        }
  1562.      }
  1563.    }
  1564. }
  1565. /* cprobe -- Wait until a message of a specific type is available. OUTPUT:
  1566. ** next_message, set to the message found of the proper type */
  1567. void cprobe(int type)
  1568. {
  1569.    int i,j;
  1570.    /* Make sure all pending writes in application have occured */
  1571.    fflush(stdout);
  1572.    fflush(stderr);
  1573.    /* See if a specific type was requested */
  1574.    if (type == -1) {
  1575.       getmsg();
  1576.       return;
  1577.    } else if ((next_message != -1) && (type == buffer[next_message].type)) {
  1578.       /* message was already located */
  1579.       return;
  1580.    } else {
  1581.       while (1) {
  1582.          /* Wait for a message for me */
  1583.          semcall_one(msgavail, mybuffer(), -1);
  1584.          /* Search for those messages that are for me and is the type I need */
  1585.          for (i = 0; i < numbuffers(); i++) {
  1586.            if (buffer[i].valid[mybuffer()] == 1) {
  1587.              if ((buffer[i].dnode == mynode()) || (buffer[i].dnode == -1)) {
  1588.                if (buffer[i].type == type) {
  1589.                  next_message = i;
  1590.                  /* Put back all skipped messages back */
  1591.                  for (j = 0; j < numbuffers(); j++) {
  1592.                     if (buffer[j].valid[mybuffer()] == 2) {
  1593.                        buffer[j].valid[mybuffer()] = 1;
  1594.                        semcall_one(msgavail, mybuffer(), 1);
  1595.                     }
  1596.  
  1597.                  }
  1598.                  return;
  1599.                } else {
  1600.                   /* Mark the message so that we don't look at it again */
  1601.                   buffer[i].valid[mybuffer()] = 2;
  1602.                }
  1603.              }
  1604.            }
  1605.          }
  1606.       }
  1607.    }
  1608. }
  1609. /* infocount -- Return the length of the message that will be received. */
  1610. int infocount(void)
  1611. {
  1612.    getmsg();
  1613.    return buffer[next_message].t_length;
  1614. }
  1615. /* infonode -- Returns the node that sent the message */
  1616. int infonode(void)
  1617. {
  1618.   getmsg();
  1619.   return buffer[next_message].snode;
  1620. }
  1621. /* infopid -- Returns the group (pid) of the node that sent the message */
  1622. int infopid(void)
  1623. {
  1624.   getmsg();
  1625.   return buffer[next_message].spid;
  1626. }
  1627. /* csend -- Synchronous message sending between two nodes. If the target node
  1628. ** number is -1, then the message is broadcasted to all nodes. Limitations:
  1629. ** Assumes that the message buffer is free to use. In other words, if
  1630. ** an asynchronous send was previously done, we assume that a msgwait
  1631. ** was done to ensure the previous message reached its destination. */
  1632. int csend(int type, void *msg, int length, int target_node, int group)
  1633. {
  1634.    int i,j, sent_length = 0;
  1635.    char *source;
  1636.    i = mybuffer();
  1637.    /* Fill in the message */
  1638.    source = msg;
  1639.    buffer[i].type = type;
  1640.    buffer[i].dnode = target_node;
  1641.    buffer[i].spid = mypid();
  1642.    buffer[i].snode = mynode();
  1643.    buffer[i].t_length = length;
  1644.    while (length > 0) {
  1645.       /* Divide the message into smaller chunks */
  1646.       buffer[i].length = min(MAX_MESSAGE_SIZE, length);
  1647.       memcpy(buffer[i].msg, source, buffer[i].length);
  1648.       source += buffer[i].length;
  1649.       sent_length += buffer[i].length;
  1650.       length -= buffer[i].length;
  1651.       /* Tell the node(s) the message is there */
  1652.       if (target_node == -1) {
  1653.          /* Broadcast the message to nodes in this partition */
  1654.          /* and to the process manager */
  1655.          for (j=0; j < nodes&us.in&us.part + 1; j++) {
  1656.             buffer[i].valid[j] = 1;
  1657.          }
  1658.          semcall_all(msgavail,nodes&us.in&us.part+1, 1);
  1659.          /* Of course, we already have the message */
  1660.          semcall_one(msgavail,i, -1);
  1661.          /* Wait until everyone receives the message */
  1662.          semcall_one(msgfree, i, -nodes&us.in&us.part);
  1663.       } else {
  1664.          /* Point to point to another node */
  1665.          j = bufferof(target_node);
  1666.          buffer[i].valid[j] = 1;
  1667.          semcall_one(msgavail, j, 1);
  1668.          /* Wait until it is received */
  1669.          semcall_one(msgfree, i, -1);
  1670.       }
  1671.    }
  1672.    return sent_length;
  1673. }
  1674. /* crecv -- Synchronous message reception between two nodes. */
  1675. int crecv(int type, void *buf, int len)
  1676. {
  1677.    int recv_len = 0, copy_len = 0, temp_len, total_len;
  1678.    int recv_node;
  1679.    char *target;
  1680.    /* Get a message of this type */
  1681.    cprobe(type);
  1682.    target = buf;
  1683.    total_len = buffer[next_message].t_length;
  1684.    recv_node = buffer[next_message].snode;
  1685.    do {
  1686.       if (recv_node != buffer[next_message].snode) {
  1687.          /* Message is from another node, put off receiving */
  1688.          buffer[next_message].valid[mybuffer()] = 3;
  1689.       } else {
  1690.          /* Message is from same node, add it the previous messages */
  1691.          recv_len += buffer[next_message].length;
  1692.          temp_len = min(len - copy_len, buffer[next_message].length);
  1693.          if (temp_len > 0) {
  1694.             memcpy(target, buffer[next_message].msg, temp_len);
  1695.             target += temp_len;
  1696.          }
  1697.          copy_len += buffer[next_message].length;
  1698.          /* Acknowledge the receipt of the message */
  1699.          buffer[next_message].valid[mybuffer()] = 0;
  1700.          semcall_one(msgfree, next_message, 1);
  1701.       }
  1702.       /* Indicate that no message has been selected */
  1703.       next_message = -1;
  1704.       if (recv_len < total_len) {
  1705.          cprobe(type);
  1706.       }
  1707.    } while (recv_len < total_len);
  1708.    /* Scan buffers to restore any skipped messages */
  1709.    for (i = 0; i < numbuffers(); i++) {
  1710.       if (buffer[i].valid[mybuffer()] == 3) {
  1711.          buffer[i].valid[mybuffer()] = 1;
  1712.          semcall_one(msgavail, mybuffer(), 1);
  1713.       }
  1714.    }
  1715.    return total_len;
  1716. }
  1717. /* isend -- Asynchronous message sending between two nodes. If the target node
  1718. ** number is -1, then the message is broadcasted to all nodes. Limitations:
  1719. ** Assumes that the message buffer is free to use. In other words, if
  1720. ** an asynchronous send was previously done, we assume that a msgwait
  1721. ** was done to ensure the previous message reached its destination. */
  1722. int isend(int type, void *msg, int length, int target_node, int group)
  1723. {
  1724.    int i,j, sent_length = 0;
  1725.    char *source;
  1726.    i = mybuffer();
  1727.    buffer[i].type = type;
  1728.    buffer[i].dnode = target_node;
  1729.    buffer[i].spid = mypid();
  1730.    buffer[i].snode = mynode();
  1731.    buffer[i].t_length = length;
  1732.    while (length > 0) {
  1733.       /* Divide the message into smaller chunks */
  1734.       buffer[i].length = min(MAX_MESSAGE_SIZE, length);
  1735.       memcpy(buffer[i].msg, source, buffer[i].length);
  1736.       source += buffer[i].length;
  1737.       sent_length += buffer[i].length;
  1738.       length -= buffer[i].length;
  1739.       /* Tell the node(s) the message is there */
  1740.       if (target_node == -1) {
  1741.          /* Broadcast the message to nodes in this partition */
  1742.          /* and to the process manager */
  1743.          for (j=0; j < nodes_in_part+1; j++) {
  1744.             buffer[i].valid[j] = 1;
  1745.          }
  1746.          semcall_all(msgavail,nodes_in_part+1, 1);
  1747.          /* Of course, we already have the message */
  1748.          semcall_one(msgavail,i, -1);
  1749.          /* Wait for acknowledge on all but the last part */
  1750.          if (length > 0) {
  1751.             /* Wait until everyone receives the message */
  1752.             semcall_one(msgfree, i, -nodes_in_part);
  1753.          }
  1754.       } else {
  1755.          /* Point to point to another node */
  1756.          j = bufferof(target_node);
  1757.          buffer[i].valid[j] = 1;
  1758.          semcall_one(msgavail, j, 1);
  1759.          /* Wait for acknowledge on all but the last part */
  1760.          if (length > 0) {
  1761.             /* Wait until it is received */
  1762.             semcall_one(msgfree, i, -1);
  1763.          }
  1764.       }
  1765.  
  1766.    }
  1767.    /* Return which buffer needs to be waited on */
  1768.    return i;
  1769. }
  1770. /* irecv -- Asynchronous message reception between two nodes. Returns message
  1771. ** identifier for acknowledging the message. */
  1772. int irecv(int type, void *buf, int len)
  1773. {
  1774.    int mid;
  1775.    int recv_len = 0, copy_len = 0, temp_len, total_len;
  1776.    char *target;
  1777.    /* Get a message of this type */
  1778.    cprobe(type);
  1779.    mid = next_message;
  1780.    target = buf;
  1781.    total_len = buffer[next_message].t_length;
  1782.    do {
  1783.       recv_len += buffer[next_message].length;
  1784.       temp_len = min(len - copy_len, buffer[next_message].length);
  1785.       if (temp_len > 0) {
  1786.          memcpy(target, buffer[next_message].msg, temp_len);
  1787.          target += temp_len;
  1788.       }
  1789.       copy_len += buffer[next_message].length;
  1790.       /* Acknowledge all but last partial message */
  1791.       if (recv_len < total_len) {
  1792.          /* Acknowledge the receipt of the message */
  1793.          buffer[next_message].valid[mybuffer()] = 0;
  1794.          semcall_one(msgfree, next_message, 1);
  1795.       }
  1796.       /* Indicate that no message has been selected */
  1797.       next_message = -1;
  1798.       if (recv_len < total_len) {
  1799.          cprobe(type);
  1800.       }
  1801.    } while (recv_len < total_len);
  1802.    return mid;
  1803. }
  1804. /* msgwait -- Wait for a message to be received by the target node(s) */
  1805. void msgwait(int mid)
  1806.  
  1807. {
  1808.   if (mid == mybuffer()) {
  1809.     /* Then it was a send to another node */
  1810.     if (buffer[mid].dnode == -1) {
  1811.       /* Wait for everyone to receive the message */
  1812.       semcall_all(msgfree, mid, -nodes&us.in&us.part);
  1813.     } else {
  1814.       semcall_one(msgfree, mid, -1);
  1815.     }
  1816.   } else {
  1817.     /* It was a receive from another node */
  1818.     semcall_one(msgfree, mid, 1);
  1819.   }
  1820. }
  1821. /* flushmsg -- Forces the removal of pending messages to a node */
  1822. void flushmsg(int type, int target_node, int group)
  1823. {
  1824.   /* Do nothing for now */
  1825.   fflush(stdout);
  1826.   fflush(stderr);
  1827. }
  1828. /* mclock -- Return time in milliseconds. */
  1829. unsigned long mclock(void)
  1830. {
  1831.   unsigned long current_time;
  1832.   time(¤t_time);
  1833.   return current_time * 1000;
  1834. }
  1835.  
  1836.  
  1837.  
  1838.